#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "chunk.h"
#include "metadata.h"
#include "net_table.h"
#include "configure.h"
#include "lich_md.h"
#include "core.h"
#include "net_global.h"
#include "squeue.h"
#include "rmvol_bh.h"
#include "../replica/replica.h"
#include "pool_ctl.h"
#include "cache.h"
#include "pool_proto.h"
#include "md_proto.h"
#include "ylog.h"
#include "dbg.h"
#include "timer.h"
#include "coroutine.h"
#include "pool_list.h"

static mcache_t *cache;

typedef struct {
        chkinfo_t chkinfo;
        reploc_t __pad__[LICH_REPLICA_MAX];
        char pool[MAX_NAME_LEN];
        poolid_t poolid;
        struct list_head list;
} arg_t;

typedef struct {
        struct list_head hook;
        chkid_t chkid;
} recycle_entry_t;

typedef struct {
        worker_handler_t handler;
        struct list_head list;
} recycle_worker_t;

static co_worker_t *worker;
static recycle_worker_t *recycle;

STATIC int __pool_ctl_get(mcache_entry_t **_cent, const chkid_t *chkid);
STATIC void __pool_ctl_release(mcache_entry_t *cent);
STATIC int __pool_ctl_try_load(const chkid_t *chkid, const fileid_t *parent);
STATIC int __pool_ctl_get__(mcache_entry_t **_cent, const chkid_t *chkid, const fileid_t *parent);
static void __pool_ctl_lease_worker_create(pool_proto_t *pool_proto);

static int __cmp(const void *s1, const void *s2)
{
        const chkid_t *chkid = s1;
        const chkid_t *ent = s2;

        return !chkid_cmp(chkid, ent);
}

static uint32_t __hash(const void *key)
{
        const chkid_t *id = key;

        return id->id;
}

static uint32_t __core_hash(const void *key)
{
        const chkid_t *id = key;

        return core_hash(id);
}

STATIC void __pool_ctl_unlock(mcache_entry_t *cent)
{
        mcache_unlock(cent);
}

STATIC int __pool_ctl_wrlock(mcache_entry_t *cent)
{
        int ret, retry = 0;
        pool_proto_t *pool_proto;
        chkid_t chkid;

retry:
        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        pool_proto = cent->value;
        if (pool_proto->needreload(pool_proto)) {
                YASSERT(retry < gloconf.rpc_timeout / 2);
                chkid = pool_proto->chkid;
                __pool_ctl_unlock(cent);

                DWARN("chunk "CHKID_FORMAT" need reload\n", CHKID_ARG(&chkid));
                ret = __pool_ctl_try_load(&chkid, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DINFO("chunk "CHKID_FORMAT" loaded, retry %u\n", CHKID_ARG(&chkid), retry);

                retry++;
                goto retry;
        } else {
                ret = lease_set(&pool_proto->lease);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        return 0;
err_lock:
        mcache_drop_nolock(cent);
        __pool_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_rdlock(mcache_entry_t *cent)
{
        int ret, retry = 0;
        pool_proto_t *pool_proto;
        chkid_t chkid;

retry:
        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        pool_proto = cent->value;
        if (pool_proto->needreload(pool_proto)) {
                YASSERT(retry < gloconf.rpc_timeout / 2);
                chkid = pool_proto->chkid;
                __pool_ctl_unlock(cent);

                DWARN("chunk "CHKID_FORMAT" need reload\n", CHKID_ARG(&chkid));
                ret = __pool_ctl_try_load(&chkid, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DINFO("chunk "CHKID_FORMAT" loaded, %u\n", CHKID_ARG(&chkid), retry);
                retry++;
                goto retry;
        } else {
                ret = lease_set(&pool_proto->lease);
                if (unlikely(ret)) {
                        GOTO(err_lock, ret);
                }
        }

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
        mcache_drop(cent);
err_ret:
        return ret;
}

STATIC void __pool_ctl_drop(mcache_entry_t *cent)
{
        int ret;

        ret = mcache_wrlock_prio(cent, 1);
        if (unlikely(ret)) {
                DWARN("ret %d\n", ret);
                return;
                // UNIMPLEMENTED(__NULL__);
        }

        mcache_drop_nolock(cent);

        __pool_ctl_unlock(cent);
}

STATIC int __pool_ctl_check_reload(const char *pool, mcache_entry_t *cent, const poolid_t *parent,
                                   const poolid_t *poolid)
{
        int ret;
        pool_proto_t *pool_proto, *new;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        pool_proto = cent->value;
        if (pool_proto->needreload(pool_proto) == 0) {
                __pool_ctl_unlock(cent);
                DINFO(""CHKID_FORMAT" loaded\n", CHKID_ARG(poolid));
                goto out;
        }
        
        __pool_ctl_unlock(cent);
        
        /* we need protect the whole load process, not allow modify chunkinfo in loading */
        ret = mcache_wrlock_prio(cent, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DINFO("reload "CHKID_FORMAT" begin\n", CHKID_ARG(poolid));

        ret = md_chunk_getinfo1(pool, parent, poolid, chkinfo, NULL);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        ret = pool_proto_load(&new, pool, parent, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        pool_proto = cent->value;
        if (gettime() - pool_proto->uptime < 2) {
                DWARN("reload "CHKID_FORMAT" too fast\n", CHKID_ARG(poolid));
        }

        YASSERT(pool_proto->needreload(pool_proto));
        pool_proto_destroy(pool_proto);
        cent->value = new;

        __pool_ctl_lease_worker_create(new); 

        __pool_ctl_unlock(cent);

out:
        return 0;
err_lock:
        mcache_drop_nolock(cent);
        __pool_ctl_unlock(cent);
err_ret:
        return ret;
}

static void __entry_free(void *ent)
{
        pool_proto_destroy(ent);
}

static int __drop(void *value, mcache_entry_t *cent, int recycle)
{
        (void) cent;
        if (value) {
                if (recycle) {
                        pool_proto_t *pool_proto = value;
                        DINFO("recycle "CHKID_FORMAT"\n", &pool_proto->chkid);
                }
                __entry_free(value);
        }

        return 0;
}

int pool_ctl_mkvol(const chkid_t *poolid, const char *name, const char *site_name, const setattr_t *setattr,
                   chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        if (strchr(name, '/')) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        if (pool_proto->chkid.type != __POOL_CHUNK__) {
                ret = ENOTDIR;
                GOTO(err_lock, ret);
        }

        ret = pool_proto->mkvol(pool_proto, poolid, name, site_name, setattr, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        DINFO("pool "CHKID_FORMAT" mkvol %s\n", CHKID_ARG(poolid), name);

        CHKINFO_DUMP(chkinfo, D_INFO);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_mkvolwith(const chkid_t *poolid, const char *name, const chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        if (pool_proto->chkid.type != __POOL_CHUNK__) {
                ret = ENOTDIR;
                GOTO(err_lock, ret);
        }

        ret = pool_proto->mkvolwith(pool_proto, poolid, name, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_mkpool(const chkid_t *poolid, const char *name, const char *site_name,
                    const setattr_t *setattr, chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        if (strchr(name, '/')) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }
        
        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        if (pool_proto->chkid.type != __POOL_CHUNK__) {
                ret = ENOTDIR;
                GOTO(err_lock, ret);
        }

        ret = pool_proto->mkpool(pool_proto, poolid, name, site_name, setattr, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        DINFO("pool "CHKID_FORMAT" mkpool %s\n", CHKID_ARG(poolid), name);
        CHKINFO_DUMP(chkinfo, D_INFO);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_extend(const chkid_t *poolid)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        if (pool_proto->chkid.type != __POOL_CHUNK__) {
                ret = ENOTDIR;
                GOTO(err_lock, ret);
        }

        if (0 == pool_proto->table_count) {
                ret = pool_proto->newtab(pool_proto, NULL);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_listpool_open(const chkid_t *poolid, const char *uuid)
{
        int ret, fd;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = pool_list_newfd(&fd);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        if (pool_proto->chkid.type != __POOL_CHUNK__) {
                ret = ENOTDIR;
                GOTO(err_lock, ret);
        }

        ret = pool_proto->listpool_open(pool_proto, fd);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        DBUG("opendir "CHKID_FORMAT", uuid %s\n", CHKID_ARG(poolid), uuid);
        
        ret = pool_list_addfd(uuid, fd);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        close(fd);
        return ret;
}

int pool_ctl_listpool(const chkid_t *poolid, const char *uuid, uint64_t offset, void *de, int *delen)
{
        int ret;

        DBUG("readdir "CHKID_FORMAT", uuid %s\n", CHKID_ARG(poolid), uuid);

        ret = pool_list_lspool(uuid, offset, de, delen);
        if (unlikely(ret)) {
                DERROR("read %s.list error, uuid:%s, offset:%d, errno:%d, errmsg:%s\n",
                       uuid, uuid, (int)offset, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        DBUG("buflen %u\n", *delen);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_listpool_close(const chkid_t *poolid, const char *uuid)
{
        int ret;

        ret = pool_list_closefd(uuid);
        if (unlikely(ret)) {
                DINFO("closedir "CHKID_FORMAT", uuid %s already closed\n", CHKID_ARG(poolid), uuid);
        } else {
                DBUG("closedir "CHKID_FORMAT", uuid %s\n", CHKID_ARG(poolid), uuid);
        }

        return 0;
}

int pool_ctl_lookup_srv(const chkid_t *chkid, const fileid_t *parent)
{
        int ret;
        mcache_entry_t *cent;

#if 1
        char pool[MAX_NAME_LEN];
        poolid_t _parent;

        ret = replica_srv_getparent(chkid, &_parent, pool);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = md_proto_migrate(pool, &_parent, chkid, NULL);
        if (unlikely(ret)) {
                if (ret == EPERM)
                        ret = EREMCHG;
                GOTO(err_ret, ret);
        }
#endif
        
        ret = __pool_ctl_get__(&cent, chkid, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 1
        lease_t lease;
        ret = lease_create(&lease, chkid);//check lease
        if (unlikely(ret))
                GOTO(err_release, ret);
        
        ret = lease_set(&lease);
        if (unlikely(ret))
                GOTO(err_release, ret);
#else
        ret = __pool_ctl_rdlock(cent);//check lease
        if (unlikely(ret))
                GOTO(err_release, ret);

        UNIMPLEMENTED(__NULL__);

        __pool_ctl_unlock(cent);
#endif
        
        __pool_ctl_release(cent);

        return 0;
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_lookup(const chkid_t *poolid, const char *name,
                    chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret)) {
                ret = (ret == ENOENT) ? EREMCHG : ret;
                GOTO(err_ret, ret);
        }

        ret = __pool_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        if (pool_proto->chkid.type != __POOL_CHUNK__) {
                ret = ENOTDIR;
                GOTO(err_lock, ret);
        }

        ret = pool_proto->lookup(pool_proto, name, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        YASSERT(chkinfo->id.type == __POOL_CHUNK__ || chkinfo->id.type == __VOLUME_CHUNK__);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC void __pool_ctl_release(mcache_entry_t *cent)
{
        mcache_release(cent);
#if ENABLE_SCHEDULE_LOCK_CHECK
        schedule_lock_set(0, -1);
#endif
}

STATIC int __pool_ctl_get____(mcache_entry_t **_cent, const chkid_t *chkid)
{
        int ret;
        mcache_entry_t *cent;

        ret = mcache_get(cache, chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_cent = cent;
#if ENABLE_SCHEDULE_LOCK_CHECK
        schedule_lock_set(0, 1);
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_ctl_get__(mcache_entry_t **_cent, const chkid_t *chkid, const fileid_t *parent)
{
        int ret, retry = 0;
        mcache_entry_t *cent;

        YASSERT(chkid->id);

retry:
        ret = __pool_ctl_get____(&cent, chkid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        YASSERT(retry == 0);
                        ret = __pool_ctl_try_load(chkid, parent);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        DINFO("chunk "CHKID_FORMAT" loaded, %u\n", CHKID_ARG(chkid), retry);
                        retry++;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        *_cent = cent;

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_ctl_get(mcache_entry_t **_cent, const chkid_t *chkid)
{
        return __pool_ctl_get__(_cent, chkid, NULL);
}


STATIC int __pool_ctl_chunk_getinfo(pool_proto_t *pool_proto, const chkid_t *chkid, chkinfo_t *chkinfo)
{
        int ret;

        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->chunk_getinfo(pool_proto, chkid, chkinfo);
        if (unlikely(ret)) {
                if (chkid->type != __VOLUME_CHUNK__
                    && chkid->type != __POOL_CHUNK__) {
                        ret = (ret == ENOENT) ? ENOKEY : ret;
                }
                GOTO(err_ret, ret);
        }

        //CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_chunk_getinfo(const chkid_t *poolid, const chkid_t *chkid, chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret)) {
                ret = (ret == ENOENT) ? EREMCHG : ret;
                GOTO(err_ret, ret);
        }

        ret = __pool_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_chunk_getinfo(cent->value, chkid, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        YASSERT(chkid_cmp(chkid, &chkinfo->id) == 0);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_getattr(pool_proto_t *pool_proto, fileinfo_t *fileinfo)
{
        int ret;

        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->getattr(pool_proto, fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_getattr(const chkid_t *chkid, fileinfo_t *fileinfo)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_getattr(cent->value, fileinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_setattr(pool_proto_t *pool_proto, fileinfo_t *fileinfo, const setattr_t *setattr)
{
        int ret;

        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->setattr(pool_proto, fileinfo, setattr);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_setattr(const chkid_t *chkid, fileinfo_t *fileinfo, const setattr_t *setattr)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_setattr(cent->value, fileinfo, setattr);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

#if 0
STATIC int __pool_ctl_chunk_move(pool_proto_t *pool_proto, const chkid_t *chkid,
                                 uint32_t op, const reploc_t *dist, int count,
                                 uint32_t force, const nid_t *oper, chkinfo_t *chkinfo)
{
        int ret;

        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->chunk_move(pool_proto, chkid, op, dist, count,
                                     force, oper, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_chunk_move(const poolid_t *poolid, const chkid_t *chkid,
                        uint32_t op, const reploc_t *dist, int count, uint32_t force,
                        const nid_t *oper, chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_chunk_move(cent->value, chkid, op, dist, count, force, oper, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}
#endif

STATIC int __pool_ctl_check_parent(const chkid_t *chkid, const fileid_t *_parent, char *pool)
{
        int ret, retry = 0;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        poolid_t poolid;

        YASSERT(chkid->type == __POOL_CHUNK__);

retry:
        ret = replica_srv_getparent(chkid, &poolid, pool);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(pool, &poolid, chkid, chkinfo, NULL);
        if (unlikely(ret)) {
                DWARN("load "CHKID_FORMAT" parent "CHKID_FORMAT", ret: %d\n",
                      CHKID_ARG(chkid), CHKID_ARG(&poolid), ret);

                if (ret == ENOENT && _parent && chkid_cmp(&poolid, _parent)) {
                        YASSERT(retry == 0);

                        DWARN("try load "CHKID_FORMAT" from "CHKID_FORMAT"\n",
                              CHKID_ARG(chkid), CHKID_ARG(_parent));

                        ret = md_chunk_getinfo1(pool, _parent, chkid, chkinfo, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = replica_srv_setparent(chkid, _parent);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        retry++;
                        goto retry;
                }

                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int pool_ctl_chunk_migrate(const poolid_t *poolid, const chkid_t *chkid, uint32_t force,
                          chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;
        char pool[MAX_NAME_LEN];
        table_proto_t *table_proto;

        (void) force;

        ret = __pool_ctl_check_parent(chkid, poolid, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_proto_migrate(pool, poolid, chkid, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_get(&cent, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        table_proto = pool_proto->table_proto;
        CHKINFO_CP(chkinfo, table_proto->chkinfo);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_chunk_reject(const poolid_t *poolid, const chkid_t *chkid, const nid_t *bad,
                          chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        ret = pool_proto->chunk_reject(pool_proto, chkid, bad, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_chunk_set(pool_proto_t *pool_proto, const chkid_t *chkid,
                                const nid_t *nid, int status)
{
        int ret;

        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->chunk_set(pool_proto, chkid, nid, status);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_chunk_set(const poolid_t *poolid, const chkid_t *chkid,
                       const nid_t *nid, int status)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_chunk_set(cent->value, chkid, nid, status);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_chunk_update(pool_proto_t *pool_proto, const chkinfo_t *chkinfo,
                                   const nid_t *owner, uint64_t info_version)
{
        int ret;

        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->chunk_update(pool_proto, chkinfo, owner, info_version);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int pool_ctl_chunk_update(const poolid_t *poolid, const chkinfo_t *chkinfo,
                          const nid_t *owner, uint64_t info_version)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_chunk_update(cent->value, chkinfo, owner, info_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_chunk_cleanup(const poolid_t *poolid, const chkid_t *chkid,
                           const nid_t *nid, uint64_t meta_version)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        ret = pool_proto->chunk_cleanup(pool_proto, chkid, nid, meta_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_rmpool(void *ent, const char *name)
{
        int ret;
        pool_proto_t *pool_proto;

        pool_proto = ent;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->rmpool(pool_proto, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_rmpool(const chkid_t *chkid, const char *name)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_rmpool(cent->value, name);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_unlink(void *ent, const char *name, int force)
{
        int ret;
        pool_proto_t *pool_proto;

        pool_proto = ent;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->unlink(pool_proto, name, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_unlink(const chkid_t *chkid, const char *name, int force)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_unlink(cent->value, name, force);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_cleanup(void *ent, const char *name)
{
        int ret;
        pool_proto_t *pool_proto;

        pool_proto = ent;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->cleanup(pool_proto, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_cleanup(const chkid_t *chkid, const char *name)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_cleanup(cent->value, name);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int __pool_ctl_cleanup0(va_list ap)
{
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        const char *name = va_arg(ap, const char *);
        va_end(ap);

        int ret;

        ret = pool_ctl_cleanup(chkid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_cleanup0(const chkid_t *chkid, const char *name)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "pool_ctl_cleanup", __pool_ctl_cleanup0, chkid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_ctl_rename_lock(void *ent, const poolid_t *src, const char *name, int force)
{
        int ret;
        pool_proto_t *pool_proto;

        pool_proto = ent;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->rename_lock(pool_proto, src, name, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_rename_lock(const poolid_t *poolid, const poolid_t *src, const char *name, int force)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_rename_lock(cent->value, src, name, force);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_rename_unlock(void *ent, const chkinfo_t *chkinfo)
{
        int ret;
        pool_proto_t *pool_proto;

        pool_proto = ent;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->rename_unlock(pool_proto, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_rename_unlock(const poolid_t *poolid, const chkinfo_t *chkinfo)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_rename_unlock(cent->value, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_rename(void *ent, const poolid_t *from, const char *fname,
                             const poolid_t *to, const char *tname)
{
        int ret;
        pool_proto_t *pool_proto;

        pool_proto = ent;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        DINFO("rename %s to %s\n", fname, tname);
        ret = pool_proto->rename(pool_proto, from, fname, to, tname);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_rename(const poolid_t *from, const char *fname,
                    const poolid_t *to, const char *tname)
{
        int ret;
        mcache_entry_t *cent;

        ret = __pool_ctl_get(&cent, from);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __pool_ctl_rename(cent->value, from, fname, to, tname);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_chunk_move(const poolid_t *poolid, const chkid_t *chkid, const nid_t *dist, int dist_count)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->chunk_move(pool_proto, chkid, dist, dist_count);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_chunk_check(const poolid_t *poolid, const chkid_t *chkid)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->chunk_check(pool_proto, chkid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_xattr_set(const poolid_t *poolid, const char *key, const char *value,
                       uint32_t valuelen, int flag)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->xattr_set(pool_proto, key, value, valuelen, flag);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_xattr_get(const poolid_t *poolid, const char *key, char *value, int *valuelen)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->xattr_get(pool_proto, key, value, valuelen);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_xattr_list(const fileid_t *fileid, char *buf, int *buflen)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->xattr_list(pool_proto, buf, buflen);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_xattr_remove(const poolid_t *poolid, const char *key)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->xattr_remove(pool_proto, key);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __pool_ctl_diskcheck(const char *pool, const poolid_t *poolid, const chkid_t *chkid)
{
        int ret, online;

        ret = replica_srv_diskonline(chkid, &online);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //YASSERT(online == 0);

        ret = md_chunk_reject(pool, poolid, chkid, net_getnid(), NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

typedef struct {
        chkid_t chkid;
        uint32_t magic;
        ytime_t time;
} lease_check_t;

static int __pool_ctl_lease(lease_check_t *lease_check, uint64_t i)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, &lease_check->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        
        if (pool_proto->needreload(pool_proto)) {
                ret = ESTALE;
                DINFO("renew "CHKID_FORMAT" exit, need reload ret %d\n", CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
       }

#if 1
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(pool_proto->pool, &pool_proto->parentid,
                                &pool_proto->chkid, chkinfo, NULL);
        if (unlikely(ret)) {
                DINFO("renew "CHKID_FORMAT" exit, getinfo fail ret %d\n", CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
        }

        /*
         * 此过程并不能保证当前节点仍在续之中,
         * 其他节点可能已经获取并且通过migrate修改了副本分布
         */
        if (!net_islocal(&chkinfo->diskid[0].id)) {
                ret = EREMCHG;
                DINFO("renew "CHKID_FORMAT" exit, ret %d\n",
                      CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
        }

        //YASSERT(chkinfo->info_version == pool_proto->table_proto->chkinfo->info_version);
        if (chkinfo->info_version != pool_proto->table_proto->chkinfo->info_version) {
                ret = EPERM;
                CHKINFO_DUMP(chkinfo, D_INFO);
                CHKINFO_DUMP(pool_proto->table_proto->chkinfo, D_INFO);
                DINFO("renew "CHKID_FORMAT" exit, ret %d\n",
                      CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
        }
#endif

        if (pool_proto->magic != lease_check->magic) {
                ret = ESTALE;
                DINFO("renew "CHKID_FORMAT" exit, reloaded, ret %d\n",
                      CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
       }

        ret = pool_proto_renew(cent->value);
        if (unlikely(ret)) {
                mcache_drop_nolock(cent);
                DINFO("renew "CHKID_FORMAT" fail ret %d\n",
                      CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
        }
        
        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        if (i % 100  == 0) {
                ytime_t now = ytime_gettime();
                DINFO("renew "CHKID_FORMAT" success %ju\n",
                      CHKID_ARG(&lease_check->chkid), i, (now - lease_check->time) / USECONDS_PER_SEC);
                lease_check->time = now;
        }
        
        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

static void __pool_ctl_lease_worker(void *arg)
{
        int ret;
        lease_check_t *lease_check = arg;
        uint64_t i = 0;
        
        DINFO("start "CHKID_FORMAT" lease worker, magic %x\n",
                        CHKID_ARG(&lease_check->chkid), lease_check->magic);

        while (1) {
                schedule_sleep("pool_lease", 1000 * 1000 * gloconf.lease_timeout / 2);

                ret = __pool_ctl_lease(lease_check, i);
                if (ret) {
                        DINFO("exit "CHKID_FORMAT" lease worker, magic %x\n",
                              CHKID_ARG(&lease_check->chkid), lease_check->magic);
                        yfree((void **)&lease_check);
                        break;
                }

                i++;
        }
}

static void __pool_ctl_lease_worker_create(pool_proto_t *pool_proto)
{
        int ret;
        lease_check_t *lease_check;

        ret = ymalloc((void **)&lease_check, sizeof(*lease_check));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        pool_proto->magic = fastrandom();
        lease_check->chkid = pool_proto->chkid;
        lease_check->magic = pool_proto->magic;
        lease_check->time = ytime_gettime();
        
        DINFO("create "CHKID_FORMAT" lease worker, magic %x time %ju\n",
              CHKID_ARG(&lease_check->chkid), lease_check->magic, lease_check->time);
        
        schedule_task_new("pool_lease", __pool_ctl_lease_worker, lease_check, -1);
}

STATIC int __pool_ctl_load_bh(const char *pool, const poolid_t *poolid, const chkid_t *chkid)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;

        YASSERT(chkid->id);

retry:
        ret = __pool_ctl_get____(&cent, chkid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DINFO("load "CHKID_FORMAT" start\n", CHKID_ARG(chkid));
                        ret = md_chunk_getinfo1(pool, poolid, chkid, chkinfo, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = pool_proto_load(&pool_proto, pool, poolid, chkinfo);
                        if (unlikely(ret)) {
                                if (ret == ENOSPC) {
                                        ret = __pool_ctl_diskcheck(pool, poolid, chkid);
                                        if (unlikely(ret))
                                                GOTO(err_ret, ret);

                                        ret = EREMCHG;
                                        GOTO(err_ret, ret);
                                } else
                                        GOTO(err_ret, ret);
                        }

                        ret = mcache_insert(cache, chkid, pool_proto);
                        if (unlikely(ret)) {
                                if (ret == EEXIST) {
                                        pool_proto_destroy(pool_proto);
                                        DWARN("load "CHKID_FORMAT" exist\n", CHKID_ARG(chkid));
                                        goto retry;
                                } else
                                        GOTO(err_ret, ret);
                        }

                        __pool_ctl_lease_worker_create(pool_proto);
                } else
                        GOTO(err_ret, ret);
        } else {
                DINFO("chunk "CHKID_FORMAT" reload\n", CHKID_ARG(chkid));

                ret = __pool_ctl_check_reload(pool, cent, poolid, chkid);
                if (unlikely(ret)) {
                        GOTO(err_release, ret);
                }

		__pool_ctl_release(cent);
        }

        DINFO("load "CHKID_FORMAT" finish\n", CHKID_ARG(chkid));

        return 0;
err_release:
        __pool_ctl_release(cent);
err_ret:
        DBUG("load "CHKID_FORMAT" fail, ret:%d\n", CHKID_ARG(chkid), ret);
        return ret;
}

STATIC void  __pool_load_bh_resume(const chkid_t *chkid, arg_t *arg, int retval)
{
        struct list_head *pos, *n;
        co_wait_task_t *wait_arg;

        (void) chkid;
        
        list_for_each_safe(pos, n, &arg->list) {
                wait_arg = (void *)pos;
                list_del(pos);
                schedule_resume(&wait_arg->task, retval, NULL);
        }
}

STATIC int  __pool_load_bh_cleanup(const chkid_t *chkid, arg_t *_arg, int retval)
{
        int ret;
        arg_t *arg;

        (void) chkid;
        
        ret = sy_spin_lock(&worker->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = squeue_remove(&worker->queue, chkid, (void **)&arg);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_load_bh_resume(chkid, arg, retval);
        
        sy_spin_unlock(&worker->lock);

        YASSERT(_arg == arg);

        yfree((void **)&_arg);

        return 0;
err_lock:
        sy_spin_unlock(&worker->lock);
err_ret:
        yfree((void **)&_arg);
        return ret;
}

STATIC int  __pool_load_bh_fail(const chkid_t *chkid, arg_t *arg, int retval)
{
        int ret;

        DBUG("load "CHKID_FORMAT" fail\n", CHKID_ARG(chkid));
        
        ret = sy_spin_lock(&worker->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __pool_load_bh_resume(chkid, arg, retval);
        
        sy_spin_unlock(&worker->lock);
        
        return 0;
err_ret:
        return ret;
}

STATIC void __pool_load_bh(void *_arg)
{
        int ret, retry = 0;
        arg_t *arg;
        time_t now, begin = gettime();

        arg = _arg;

retry:
        ret = __pool_ctl_load_bh(arg->pool, &arg->poolid, &arg->chkinfo.id);
        if (ret) {
                now = gettime();
                if (ret == EREMCHG || ret == ENOENT
                    || now - begin > gloconf.lease_timeout) {
                        DWARN("load "CHKID_FORMAT" total %u (%u) %s\n",
                              CHKID_ARG(&arg->chkinfo.id), (int)(now - begin), ret, strerror(ret));
                        GOTO(err_ret, ret);
                } else {
                        __pool_load_bh_fail(&arg->chkinfo.id, arg,
                                              ret == EREMCHG ? ret : ESTALE);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 100, (100 * 1000));
                }
        }

        ret = __pool_load_bh_cleanup(&arg->chkinfo.id, arg, 0);
        YASSERT(ret == 0);

        return;
err_ret:
        if (ret == ENOENT) {
                ret = __pool_load_bh_cleanup(&arg->chkinfo.id, arg,
                                             ret);
        } else {
                ret = __pool_load_bh_cleanup(&arg->chkinfo.id, arg,
                                             ret == EREMCHG ? ret : ESTALE);
        }
        YASSERT(ret == 0);
}

static uint32_t __key_from_int(const void *i)
{
        return ((chkid_t *)i)->id;
}

static int __equal(const void *key, const void *data)
{
        const chkid_t *id = key;
        const squeue_entry_t *sent = data;
        arg_t *arg;

        arg = sent->ent;

        DBUG("chkid "CHKID_FORMAT" --- "CHKID_FORMAT"\n", CHKID_ARG(id),
             CHKID_ARG(&arg->chkinfo.id));

        return !chkid_cmp(id, &arg->chkinfo.id);
}

STATIC int __pool_ctl_loader_init()
{
        int ret;

        ret = ymalloc((void **)&worker, sizeof(*worker));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_spin_init(&worker->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = squeue_init(&worker->queue, 1024, __equal, __key_from_int);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        ret = worker_create(&worker->sem, "dir_load", __dir_load_worker,
                            NULL, NULL, WORKER_TYPE_SEM, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}

STATIC void __pool_recycle_collect(void *arg, void *ent)
{
        int ret;
        recycle_entry_t *rent;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        (void) arg;
        cent = ent;
        pool_proto = cent->value;

        ret = ymalloc((void **)&rent, sizeof(*rent));
        if (unlikely(ret))
                goto out;

        rent->chkid = pool_proto->chkid;
        list_add(&rent->hook, &recycle->list);

out:
        return;
}

STATIC int __pool_recycle_handle()
{
        int ret;
        recycle_entry_t *rent, *tmp;
        chkid_t *chkid;
        mcache_entry_t *cent;

        list_for_each_entry_safe(rent, tmp, &recycle->list, hook) {
                chkid = &rent->chkid;

                ret = __pool_ctl_get(&cent, chkid);
                if (!ret) {
                        __pool_ctl_release(cent);
                }

                list_del(&rent->hook);
                yfree((void **)&rent);
        }

        return 0;
}

STATIC int __pool_recycle_worker(void *_arg)
{
        int ret;
        (void) _arg;

        mcache_iterator(cache, __pool_recycle_collect, NULL);

        ret = __pool_recycle_handle();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_settime(&recycle->handler, USEC_PER_DAY);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_ctl_recycle_init()
{
        int ret;

        ret = ymalloc((void **)&recycle, sizeof(*recycle));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        INIT_LIST_HEAD(&recycle->list);

        ret = timer1_create(&recycle->handler, "pool_recycle", __pool_recycle_worker, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_settime(&recycle->handler, USEC_PER_DAY);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_load_wait__(const char *pool, const poolid_t *poolid, const chkinfo_t *_chkinfo)
{
        int ret;
        arg_t *arg;
        co_wait_task_t wait_arg;

        ret = sy_spin_lock(&worker->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = squeue_get(&worker->queue, &_chkinfo->id, (void **)&arg);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        ret = ymalloc((void**)&arg, sizeof(*arg));
                        if (unlikely(ret))
                                GOTO(err_lock, ret);

                        strcpy(arg->pool, pool);
                        arg->poolid = *poolid;
                        INIT_LIST_HEAD(&arg->list);

                        memcpy(&arg->chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));

                        ret = squeue_insert(&worker->queue, &_chkinfo->id, arg, 0);
                        if (unlikely(ret))
                                GOTO(err_free, ret);

                        DINFO("require load %s\n", id2str(&_chkinfo->id));

                        wait_arg.task = schedule_task_get();
                        list_add(&wait_arg.hook, &arg->list);

                        schedule_task_new("pool_load", __pool_load_bh, arg, -1);
                        //worker_post(&worker->sem);
                } else
                        GOTO(err_lock, ret);
        } else {
                DBUG("%s loading\n", id2str(&_chkinfo->id));
                wait_arg.task = schedule_task_get();
                list_add(&wait_arg.hook, &arg->list);
        }

        sy_spin_unlock(&worker->lock);

        ANALYSIS_BEGIN(0);

        ret = schedule_yield("pool_loading", NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, 1000 * 100, id2str(&_chkinfo->id));

        return 0;
err_free:
        yfree((void **)&arg);
err_lock:
        sy_spin_unlock(&worker->lock);
err_ret:
        return ret;
}

STATIC int __pool_load_wait_request(va_list ap)
{
        int ret;
        const char *pool = va_arg(ap, char *);
        const poolid_t *poolid = va_arg(ap, poolid_t *);
        const chkinfo_t *chkinfo = va_arg(ap, chkinfo_t *);

        va_end(ap);

        ret = __pool_load_wait__(pool, poolid, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_load_wait(const char *pool, const poolid_t *poolid, const chkinfo_t *_chkinfo)
{
        int ret;

        if (schedule_self()) {
                ret = __pool_load_wait__(pool, poolid, _chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = core_request(core_hash(poolid), -1, "pool_load",
                                   __pool_load_wait_request, pool, poolid, _chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_ctl_try_load(const chkid_t *chkid, const fileid_t *_parent)
{
        int ret, retry = 0;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX], pool[MAX_NAME_LEN];
        poolid_t poolid;

        ANALYSIS_BEGIN(0);

        YASSERT(chkid->type == __POOL_CHUNK__);

retry:
        ret = replica_srv_getparent(chkid, &poolid, pool);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(pool, &poolid, chkid, chkinfo, NULL);
        if (unlikely(ret)) {
                DBUG("load "CHKID_FORMAT" parent "CHKID_FORMAT", ret: %d\n",
                     CHKID_ARG(chkid), CHKID_ARG(&poolid), ret);

                if (ret == ENOENT && _parent && chkid_cmp(&poolid, _parent)) {
                        YASSERT(retry == 0);

                        DWARN("try load "CHKID_FORMAT" from "CHKID_FORMAT"\n",
                              CHKID_ARG(chkid), CHKID_ARG(_parent));

                        ret = md_chunk_getinfo1(pool, _parent, chkid, chkinfo, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = replica_srv_setparent(chkid, _parent);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        retry++;
                        goto retry;
                }

                GOTO(err_ret, ret);
        }

        // is first replica node?
        if (!net_islocal(&chkinfo->diskid[0].id)) {
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        ret = __pool_load_wait(pool, &poolid, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_DUMP(chkinfo, D_INFO);
        ANALYSIS_END(0, 1000 * 100, id2str(chkid));

        return 0;
err_ret:
        return ret;
}

int pool_ctl_chunk_sync(const poolid_t *poolid, const chkid_t *chkid)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->chunk_sync(pool_proto, chkid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_chunk_sync_force(const poolid_t *poolid, const chkid_t *chkid, const nid_t *srcnid)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);

        ret = pool_proto->chunk_set(pool_proto, chkid, srcnid, __S_CHECK);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = pool_proto->chunk_sync(pool_proto, chkid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}

int pool_ctl_init(uint64_t max_chunk)
{
        int ret;

        ret = mcache_init(&cache, max_chunk, __cmp, __hash, __core_hash,
                          __drop, 0, "pool_ctl");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_loader_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_recycle_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = pool_list_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_ctl_chunk_iterator(const poolid_t *poolid, func2_t func2, void *_arg)
{
        int ret;
        mcache_entry_t *cent;
        pool_proto_t *pool_proto;

        ret = __pool_ctl_get(&cent, poolid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        pool_proto = cent->value;
        YASSERT(pool_proto->chkid.type == __POOL_CHUNK__);
        ret = pool_proto->chunk_iterator(pool_proto, func2, _arg);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __pool_ctl_unlock(cent);
        __pool_ctl_release(cent);

        return 0;
err_lock:
        __pool_ctl_unlock(cent);
err_release:
        __pool_ctl_release(cent);
err_ret:
        return ret;
}
